package xin.alum.aim.groups;

import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.ChannelMatcher;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StopWatch;
import xin.alum.aim.AIM;
import xin.alum.aim.constant.AIMConstant;
import xin.alum.aim.constant.ChannelAttr;
import xin.alum.aim.constant.ChannelClose;
import xin.alum.aim.constant.ChannelPlatform;
import xin.alum.aim.handler.BaseServerHandler;
import xin.alum.aim.model.Reply;
import xin.alum.aim.model.Transportable;
import xin.alum.aim.util.ApplicationContextUtil;

import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.Optional;

/**
 * 所有在线连接管理【不支持群组集群，集群推送请用SessionGroups】
 *
 * @auther alum(alum @ live.cn)
 * @date 2021/8/3 14:52
 */
@Component
public class Sessions extends DefaultChannelGroup {
    /**
     * 区分
     */
    public static final String PREFIX_BIND_USER_GROUP = "AIM_USER_";

    /**
     * 所有在线用户
     */
    public static final String ALL_SESSIONS = "AIM-ALL-SESSIONS";

    @Autowired
    private SessionGroups groups;

    @Autowired
    private ClusterProperties clusterProperties;

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(Sessions.class);

    /**
     *
     */
    public Sessions() {
        super(ALL_SESSIONS, GlobalEventExecutor.INSTANCE);
    }

    /**
     * @param name
     */
    public Sessions(String name) {
        super(name, GlobalEventExecutor.INSTANCE);
    }


    /**
     * 绑定用户通道到Sessions
     *
     * @param ch
     */
    private Boolean bind(Channel ch, String uid) {
        StopWatch t = new StopWatch();
        t.start("BIND-ALL");
        if (!super.contains(ch)) {
            super.add(ch);
        }
        t.stop();
        t.start("BIND-USER");
        boolean bResult = groups.bind(ch, PREFIX_BIND_USER_GROUP.concat(uid)).contains(ch);
        t.stop();
        if (t.getTotalTimeMillis() > 50) {
            logger.info("{}新连接接入,在线:{}人,总用时：{}ms,\n{}", ch, super.size(), t.getTotalTimeMillis(), t.prettyPrint());
        }
        return bResult;
    }

    /**
     * 集群通知 Session上线
     *
     * @param session
     */
    public void kick(Session session) {
        Sessions sessions = groups.get(PREFIX_BIND_USER_GROUP.concat(session.getUid()));
        if (sessions != null) {
            ChannelMatcher m = w -> (!w.id().asShortText().equalsIgnoreCase(session.getCid())) && w.attr(ChannelAttr.PLATFORM).get().equals(session.getPlatform().name());
            Iterator<Channel> users = sessions.iterator();
            while (users.hasNext()) {
                Channel ch = users.next();
                if (m.matches(ch)) {
                    offline(ch, session);
                    break;
                }
            }
        }
    }

    /**
     * 让连接通道下线
     */
    private void offline(Channel oldChannel, Session session) {
        Reply reply = AIM.request.onKick(oldChannel, session);
        //通信旧连接下线
        logger.warn("{}通知用户ID【{}】旧连接下线", oldChannel, session.getUid());
        if (oldChannel.isWritable()) {
            oldChannel.writeAndFlush(reply);
        }
        //关闭旧连接
        BaseServerHandler b = (BaseServerHandler) oldChannel.pipeline().get(AIMConstant.PIPELINE_SOCKET);
        b.onClose(oldChannel, ChannelClose.KILL);
    }

    /**
     * 获取客户端IP
     *
     * @param ch
     * @return
     */
    public String getClientIP(Channel ch) {
        InetSocketAddress ip = (InetSocketAddress) ch.remoteAddress();
        return ip.getAddress().getHostAddress();
    }

    /**
     * 获取 bindUser设定的UID
     *
     * @param ch
     * @return
     */
    public String getUId(Channel ch) {
        return ch.attr(ChannelAttr.UID).get();
    }

    /**
     * 获取
     *
     * @param ch
     * @param key
     */
    public Object getAttr(Channel ch, String key) {
        return ch.attr(AttributeKey.valueOf(key)).get();
    }

    /**
     * 设置属性
     *
     * @param ch
     * @param key
     * @param val
     * @param <T>
     */
    public <T extends Class> void setAttr(Channel ch, String key, T val) {
        ch.attr(AttributeKey.valueOf(key)).set(val);
    }

    /**
     * 绑定
     */
    public void bindUid(Channel ch, String userId) {
        bindUser(ch, userId, ChannelPlatform.NON, "");
    }

    /**
     * 绑定
     */
    public void bindUser(Channel ch, String userId, String deviceId) {
        bindUser(ch, userId, ChannelPlatform.NON, deviceId);
    }

    /**
     * 绑定
     *
     * @param ch
     * @param userId
     * @param platform
     */
    public Boolean bindUser(Channel ch, String userId, ChannelPlatform platform, String deviceId) {
        Optional<Channel> s = super.stream().filter(w -> w.attr(ChannelAttr.UID).get().equals(userId) && w.attr(ChannelAttr.PLATFORM).get().equals(platform.name())).findFirst();
        if (s.isPresent()) {
            Channel oldCh = s.get();
            offline(oldCh, new Session(userId, ch.id(), platform, getClientIP(ch), deviceId));
        } else if (clusterProperties.getMode() != ClusterProperties.ClusterMode.None) {
            ApplicationContextUtil.getBean(ClusterFactory.class).kick(new Session(userId, ch.id(), platform, getClientIP(ch), deviceId));
        }

        ch.attr(ChannelAttr.UID).set(userId);
        ch.attr(ChannelAttr.PLATFORM).set(platform.name());
        ch.attr(ChannelAttr.DEVICE_ID).set(deviceId);
        ch.attr(ChannelAttr.UIP).set(getClientIP(ch));
        return bind(ch, userId);
    }

    static long bytes = 0;

    /**
     * 向当前群组发送消息[不含集群]
     *
     * @param msg
     * @return
     */
    public void sends(Transportable msg) {
        sends(w -> w.isActive(), msg);
    }

    /**
     * 向用户发送消息[含集群]
     *
     * @param uid
     * @param msg
     */
    public void send(String uid, Transportable msg) {
        if (uid == null || uid.isEmpty()) {
            logger.error("Channel未绑定uid={}，无法发送!", uid);
        } else if (clusterProperties.getMode() != ClusterProperties.ClusterMode.None) {
            ApplicationContextUtil.getBean(ClusterFactory.class).push(PREFIX_BIND_USER_GROUP.concat(uid), msg);
        } else {
            groups.get(PREFIX_BIND_USER_GROUP.concat(uid)).sends(msg);
        }
    }

    /**
     * 向不包含ch的其它通道发送消息
     *
     * @param ch
     * @param msg
     */
    public ChannelGroupFuture sends(Channel ch, Transportable msg) {
        return sends(w -> w.id() != ch.id(), msg);
    }

    /**
     * 向通道Matcher匹配到的连接发送信息
     *
     * @param matcher 通道匹配表达式
     * @param msg     消息体
     * @return 结果
     */
    public ChannelGroupFuture sends(ChannelMatcher matcher, Transportable msg) {
        ChannelGroupFuture f = super.writeAndFlush(msg, matcher, true);
        return f;
    }
}
